Launch spark session behind the jupyter notebook


In [1]:
!ls -l $SPARK_HOME

In [2]:
# Note: set SPARK_HOME to Spark binaries before launching the Jupyter session.
import os, sys
SPARK_HOME = os.environ['SPARK_HOME']
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "lib", "py4j-0.10.4-src.zip"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print("Spark version: ", spark.version)


Spark version:  2.2.0

In [3]:
spark.sparkContext.uiWebUrl


Out[3]:
'http://192.168.1.6:4040'

Import libararies


In [4]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.pipeline import Pipeline

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import evaluation
from pyspark.sql.functions import * 

import pandas as pd
import pyspark
import numpy as np

In [5]:
pd.__version__, np.__version__,pyspark.__version__


Out[5]:
('0.21.0', '1.13.3', '2.2.0')

Check version of the libraries. For this notebook, I am using Spark 2.2.0

Load Dataset

You can download the dataset from here


In [6]:
credit = spark.read.options(header = True, inferSchema = True).csv("/data/credit-default.csv").cache()
print("Total number of records: ", credit.count())
credit.limit(10).toPandas().head() 
# Taking 10 samples records from spark dtaframe into a Pandas dataframe to display the values
# I prefer the pandas dataframe display to that by spark dataframe show function.


Total number of records:  1000
Out[6]:
checking_balance months_loan_duration credit_history purpose amount savings_balance employment_length installment_rate personal_status other_debtors ... property age installment_plan housing existing_credits default dependents telephone foreign_worker job
0 < 0 DM 6 critical radio/tv 1169 unknown > 7 yrs 4 single male none ... real estate 67 none own 2 1 1 yes yes skilled employee
1 1 - 200 DM 48 repaid radio/tv 5951 < 100 DM 1 - 4 yrs 2 female none ... real estate 22 none own 1 2 1 none yes skilled employee
2 unknown 12 critical education 2096 < 100 DM 4 - 7 yrs 2 single male none ... real estate 49 none own 1 1 2 none yes unskilled resident
3 < 0 DM 42 repaid furniture 7882 < 100 DM 4 - 7 yrs 2 single male guarantor ... building society savings 45 none for free 1 1 2 none yes skilled employee
4 < 0 DM 24 delayed car (new) 4870 < 100 DM 1 - 4 yrs 3 single male none ... unknown/none 53 none for free 2 2 2 none yes skilled employee

5 rows × 21 columns

View the schema


In [7]:
credit.printSchema()


root
 |-- checking_balance: string (nullable = true)
 |-- months_loan_duration: integer (nullable = true)
 |-- credit_history: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- savings_balance: string (nullable = true)
 |-- employment_length: string (nullable = true)
 |-- installment_rate: integer (nullable = true)
 |-- personal_status: string (nullable = true)
 |-- other_debtors: string (nullable = true)
 |-- residence_history: integer (nullable = true)
 |-- property: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- installment_plan: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- existing_credits: integer (nullable = true)
 |-- default: integer (nullable = true)
 |-- dependents: integer (nullable = true)
 |-- telephone: string (nullable = true)
 |-- foreign_worker: string (nullable = true)
 |-- job: string (nullable = true)

As I can see, there are number of columns of string type - checking_balance, credit_history etc.

Let me define a function that take a catgorical column and pass it through StringIndexer and OneHotEncoder it gives back a dataframe with same column name as the original categorical column. It reurns a new dataframe that contains categorical column replaced by OneHotEncoded vector.

Find all columns of String datatype

Transform each string column type into OneHotEncoded value and collect distinct values for each categorical column in list as shown below.


In [8]:
cols = credit.columns
cols.remove("default")
cols


Out[8]:
['checking_balance',
 'months_loan_duration',
 'credit_history',
 'purpose',
 'amount',
 'savings_balance',
 'employment_length',
 'installment_rate',
 'personal_status',
 'other_debtors',
 'residence_history',
 'property',
 'age',
 'installment_plan',
 'housing',
 'existing_credits',
 'dependents',
 'telephone',
 'foreign_worker',
 'job']

In [9]:
from pyspark.ml import Model, Estimator 

class DFOneHotEncoderModel(Model):
    
    def get_col_labels(self):
        
        cols = []
        feature_columns = [c for c in self.columns if not c == self.label_column]
        
        for col in feature_columns:
            if col in self.categorical_fields:
                string_indexer, _ = self.categorical_fields[col]
                values = string_indexer.labels
                values = values[:-1] if self.drop_last else values
                values = [col + "_" + v for v in values]
                cols.extend(values)
            else:
                cols.append(col) 
            
        return cols
    
    def transform(self, df, params= None):
        
        for colname in self.categorical_fields:
            string_indexer, one_hot_encoder = self.categorical_fields[colname]
        
            df = string_indexer.transform(df)
            df = df.drop(colname)
            df = df.withColumnRenamed(colname + "_idx", colname)

            if one_hot_encoder:
                df = one_hot_encoder.transform(df)
                df = df.drop(colname)
                df = df.withColumnRenamed(colname + "_ohe", colname)
                
        return df
        
class DFOneHotEncoder(Estimator):
    
    def __init__(self, label_column, categorical_fields= None,  one_hot = True, drop_last = True):
        self.categorical_fields = None
        self.one_hot = one_hot
        self.drop_last = drop_last
        self.label_column = label_column 
        
        if not categorical_fields is None:
            self.categorical_fields = dict([(c, None) for c in categorical_fields])     

    def fit(self, df):
        cols = df.columns
        if self.categorical_fields is None:
            self.categorical_fields = dict([(col, None) for col, dtype in df.dtypes if dtype == "string"])
        
        
        for colname in self.categorical_fields:
            string_indexer = StringIndexer(inputCol=colname, outputCol= colname + "_idx").fit(df)
            
            one_hot_encoder = None
            if self.one_hot:
                one_hot_encoder = OneHotEncoder(inputCol=colname
                                            , outputCol=colname + "_ohe" , dropLast = self.drop_last)

            self.categorical_fields[colname] = (string_indexer, one_hot_encoder)
            

        model = DFOneHotEncoderModel()
        model.categorical_fields = self.categorical_fields
        model.one_hot = self.one_hot
        model.drop_last = self.drop_last
        model.columns = cols
        model.label_column = self.label_column
        
        return model

In [10]:
model = DFOneHotEncoder(label_column = "default").fit(credit)
df = model.transform(credit)
print(df.dtypes)
print("\n")
print(model.get_col_labels())


[('months_loan_duration', 'int'), ('amount', 'int'), ('installment_rate', 'int'), ('residence_history', 'int'), ('age', 'int'), ('existing_credits', 'int'), ('default', 'int'), ('dependents', 'int'), ('checking_balance', 'vector'), ('credit_history', 'vector'), ('purpose', 'vector'), ('savings_balance', 'vector'), ('employment_length', 'vector'), ('personal_status', 'vector'), ('other_debtors', 'vector'), ('property', 'vector'), ('installment_plan', 'vector'), ('housing', 'vector'), ('telephone', 'vector'), ('foreign_worker', 'vector'), ('job', 'vector')]


['checking_balance_unknown', 'checking_balance_< 0 DM', 'checking_balance_1 - 200 DM', 'months_loan_duration', 'credit_history_repaid', 'credit_history_critical', 'credit_history_delayed', 'credit_history_fully repaid this bank', 'purpose_radio/tv', 'purpose_car (new)', 'purpose_furniture', 'purpose_car (used)', 'purpose_business', 'purpose_education', 'purpose_repairs', 'purpose_others', 'purpose_domestic appliances', 'amount', 'savings_balance_< 100 DM', 'savings_balance_unknown', 'savings_balance_101 - 500 DM', 'savings_balance_501 - 1000 DM', 'employment_length_1 - 4 yrs', 'employment_length_> 7 yrs', 'employment_length_4 - 7 yrs', 'employment_length_0 - 1 yrs', 'installment_rate', 'personal_status_single male', 'personal_status_female', 'personal_status_married male', 'other_debtors_none', 'other_debtors_guarantor', 'residence_history', 'property_other', 'property_real estate', 'property_building society savings', 'age', 'installment_plan_none', 'installment_plan_bank', 'housing_own', 'housing_rent', 'existing_credits', 'dependents', 'telephone_none', 'foreign_worker_yes', 'job_skilled employee', 'job_unskilled resident', 'job_mangement self-employed']

Verify that all columns in df is either of numeric or numeric vector type


In [11]:
df.printSchema()


root
 |-- months_loan_duration: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- installment_rate: integer (nullable = true)
 |-- residence_history: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- existing_credits: integer (nullable = true)
 |-- default: integer (nullable = true)
 |-- dependents: integer (nullable = true)
 |-- checking_balance: vector (nullable = true)
 |-- credit_history: vector (nullable = true)
 |-- purpose: vector (nullable = true)
 |-- savings_balance: vector (nullable = true)
 |-- employment_length: vector (nullable = true)
 |-- personal_status: vector (nullable = true)
 |-- other_debtors: vector (nullable = true)
 |-- property: vector (nullable = true)
 |-- installment_plan: vector (nullable = true)
 |-- housing: vector (nullable = true)
 |-- telephone: vector (nullable = true)
 |-- foreign_worker: vector (nullable = true)
 |-- job: vector (nullable = true)

Create a list of columns except the label column

Use a vector assembler to transform all features into a single feature column


In [12]:
df_vect = VectorAssembler(inputCols = cols, outputCol="features").transform(df)
df_vect.select("features", "default").limit(5).toPandas()


Out[12]:
features default
0 (0.0, 1.0, 0.0, 6.0, 0.0, 1.0, 0.0, 0.0, 1.0, ... 1
1 (0.0, 0.0, 1.0, 48.0, 1.0, 0.0, 0.0, 0.0, 1.0,... 2
2 (1.0, 0.0, 0.0, 12.0, 0.0, 1.0, 0.0, 0.0, 0.0,... 1
3 (0.0, 1.0, 0.0, 42.0, 1.0, 0.0, 0.0, 0.0, 0.0,... 1
4 (0.0, 1.0, 0.0, 24.0, 0.0, 0.0, 1.0, 0.0, 0.0,... 2

Let me spot check whether OneHotEncode worked ok.


In [13]:
credit.first()


Out[13]:
Row(checking_balance='< 0 DM', months_loan_duration=6, credit_history='critical', purpose='radio/tv', amount=1169, savings_balance='unknown', employment_length='> 7 yrs', installment_rate=4, personal_status='single male', other_debtors='none', residence_history=4, property='real estate', age=67, installment_plan='none', housing='own', existing_credits=2, default=1, dependents=1, telephone='yes', foreign_worker='yes', job='skilled employee')

In [14]:
pd.DataFrame({"feature": model.get_col_labels(), "value": df_vect.select("features").first().features})


Out[14]:
feature value
0 checking_balance_unknown 0.0
1 checking_balance_< 0 DM 1.0
2 checking_balance_1 - 200 DM 0.0
3 months_loan_duration 6.0
4 credit_history_repaid 0.0
5 credit_history_critical 1.0
6 credit_history_delayed 0.0
7 credit_history_fully repaid this bank 0.0
8 purpose_radio/tv 1.0
9 purpose_car (new) 0.0
10 purpose_furniture 0.0
11 purpose_car (used) 0.0
12 purpose_business 0.0
13 purpose_education 0.0
14 purpose_repairs 0.0
15 purpose_others 0.0
16 purpose_domestic appliances 0.0
17 amount 1169.0
18 savings_balance_< 100 DM 0.0
19 savings_balance_unknown 1.0
20 savings_balance_101 - 500 DM 0.0
21 savings_balance_501 - 1000 DM 0.0
22 employment_length_1 - 4 yrs 0.0
23 employment_length_> 7 yrs 1.0
24 employment_length_4 - 7 yrs 0.0
25 employment_length_0 - 1 yrs 0.0
26 installment_rate 4.0
27 personal_status_single male 1.0
28 personal_status_female 0.0
29 personal_status_married male 0.0
30 other_debtors_none 1.0
31 other_debtors_guarantor 0.0
32 residence_history 4.0
33 property_other 0.0
34 property_real estate 1.0
35 property_building society savings 0.0
36 age 67.0
37 installment_plan_none 1.0
38 installment_plan_bank 0.0
39 housing_own 1.0
40 housing_rent 0.0
41 existing_credits 2.0
42 dependents 1.0
43 telephone_none 0.0
44 foreign_worker_yes 1.0
45 job_skilled employee 1.0
46 job_unskilled resident 0.0
47 job_mangement self-employed 0.0

In [15]:
df_train, df_test = df_vect.randomSplit(weights=[0.7, 0.3], seed=1)
df_train.count(), df_test.count()


Out[15]:
(704, 296)

Build a RandomForest Classifier


In [16]:
forest = RandomForestClassifier(labelCol="default", featuresCol="features", seed = 123)
forest_model = forest.fit(df_train)

Run prediction on the whole dataset


In [17]:
df_test_pred = forest_model.transform(df_test)
df_test_pred.show(5)


+--------------------+------+----------------+-----------------+---+----------------+-------+----------+----------------+--------------+-------------+---------------+-----------------+---------------+-------------+-------------+----------------+-------------+-------------+--------------+-------------+--------------------+--------------------+--------------------+----------+
|months_loan_duration|amount|installment_rate|residence_history|age|existing_credits|default|dependents|checking_balance|credit_history|      purpose|savings_balance|employment_length|personal_status|other_debtors|     property|installment_plan|      housing|    telephone|foreign_worker|          job|            features|       rawPrediction|         probability|prediction|
+--------------------+------+----------------+-----------------+---+----------------+-------+----------+----------------+--------------+-------------+---------------+-----------------+---------------+-------------+-------------+----------------+-------------+-------------+--------------+-------------+--------------------+--------------------+--------------------+----------+
|                   4|  1544|               2|                1| 42|               3|      1|         2|   (3,[0],[1.0])| (4,[1],[1.0])|(9,[0],[1.0])|  (4,[0],[1.0])|    (4,[2],[1.0])|  (3,[0],[1.0])|(2,[0],[1.0])|(3,[1],[1.0])|   (2,[0],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])| (1,[0],[1.0])|(3,[1],[1.0])|(48,[0,3,5,8,17,1...|[0.0,18.211093191...|[0.0,0.9105546595...|       1.0|
|                   6|   362|               4|                4| 52|               2|      1|         1|   (3,[0],[1.0])| (4,[1],[1.0])|(9,[1],[1.0])|  (4,[2],[1.0])|    (4,[0],[1.0])|  (3,[1],[1.0])|(2,[0],[1.0])|(3,[0],[1.0])|   (2,[0],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])| (1,[0],[1.0])|(3,[1],[1.0])|(48,[0,3,5,9,17,2...|[0.0,17.371032390...|[0.0,0.8685516195...|       1.0|
|                   6|   454|               3|                1| 22|               1|      1|         1|   (3,[2],[1.0])| (4,[0],[1.0])|(9,[6],[1.0])|  (4,[0],[1.0])|    (4,[3],[1.0])|  (3,[2],[1.0])|(2,[0],[1.0])|(3,[2],[1.0])|   (2,[0],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])| (1,[0],[1.0])|(3,[1],[1.0])|(48,[2,3,4,14,17,...|[0.0,13.743747374...|[0.0,0.6871873687...|       1.0|
|                   6|   518|               3|                1| 29|               1|      1|         1|   (3,[0],[1.0])| (4,[0],[1.0])|(9,[0],[1.0])|  (4,[0],[1.0])|    (4,[0],[1.0])|  (3,[1],[1.0])|(2,[0],[1.0])|(3,[1],[1.0])|   (2,[0],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])| (1,[0],[1.0])|(3,[0],[1.0])|(48,[0,3,4,8,17,1...|[0.0,17.285335835...|[0.0,0.8642667917...|       1.0|
|                   6|   609|               4|                3| 37|               2|      1|         1|   (3,[1],[1.0])| (4,[1],[1.0])|(9,[1],[1.0])|  (4,[0],[1.0])|    (4,[2],[1.0])|  (3,[1],[1.0])|(2,[0],[1.0])|(3,[2],[1.0])|   (2,[0],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|     (1,[],[])|(3,[0],[1.0])|(48,[1,3,5,9,17,1...|[0.0,15.599321361...|[0.0,0.7799660680...|       1.0|
+--------------------+------+----------------+-----------------+---+----------------+-------+----------+----------------+--------------+-------------+---------------+-----------------+---------------+-------------+-------------+----------------+-------------+-------------+--------------+-------------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows

Confusion Metrics


In [18]:
df_test_pred.groupBy("default").pivot("prediction").count().show()


+-------+---+---+
|default|1.0|2.0|
+-------+---+---+
|      1|197| 14|
|      2| 67| 18|
+-------+---+---+

Evaluate


In [19]:
evaluator = evaluation.MulticlassClassificationEvaluator(labelCol="default", 
                                        metricName="accuracy", predictionCol="prediction")
evaluator.evaluate(df_test_pred)


Out[19]:
0.7263513513513513

In [20]:
print("Total number of features: ", forest_model.numFeatures, "\nOrder of feature importance: \n")
pd.DataFrame({"importance": forest_model.featureImportances.toArray(), 
              "feature": model.get_col_labels()
             }).sort_values("importance", ascending = False)


Total number of features:  48 
Order of feature importance: 

Out[20]:
feature importance
0 checking_balance_unknown 0.137719
17 amount 0.108318
3 months_loan_duration 0.107306
1 checking_balance_< 0 DM 0.084107
5 credit_history_critical 0.050758
36 age 0.037456
19 savings_balance_unknown 0.034928
2 checking_balance_1 - 200 DM 0.033914
32 residence_history 0.028649
9 purpose_car (new) 0.027793
38 installment_plan_bank 0.024021
27 personal_status_single male 0.020772
34 property_real estate 0.019711
40 housing_rent 0.019111
41 existing_credits 0.017382
37 installment_plan_none 0.016538
26 installment_rate 0.016347
7 credit_history_fully repaid this bank 0.015123
20 savings_balance_101 - 500 DM 0.014589
47 job_mangement self-employed 0.013875
39 housing_own 0.012741
25 employment_length_0 - 1 yrs 0.010733
43 telephone_none 0.010121
18 savings_balance_< 100 DM 0.009821
28 personal_status_female 0.009779
21 savings_balance_501 - 1000 DM 0.009693
31 other_debtors_guarantor 0.009417
33 property_other 0.008785
30 other_debtors_none 0.007644
10 purpose_furniture 0.007480
13 purpose_education 0.006903
46 job_unskilled resident 0.006763
24 employment_length_4 - 7 yrs 0.006574
4 credit_history_repaid 0.005442
45 job_skilled employee 0.004831
6 credit_history_delayed 0.004603
11 purpose_car (used) 0.004378
15 purpose_others 0.004144
22 employment_length_1 - 4 yrs 0.004059
29 personal_status_married male 0.003893
8 purpose_radio/tv 0.003618
35 property_building society savings 0.003350
12 purpose_business 0.003306
42 dependents 0.003301
14 purpose_repairs 0.002724
23 employment_length_> 7 yrs 0.002723
44 foreign_worker_yes 0.002495
16 purpose_domestic appliances 0.002262

Building a pipeline


In [21]:
from pyspark.ml.pipeline import Pipeline, PipelineModel

In [23]:
credit = spark.read.options(header = True, inferSchema = True).csv("/data/credit-default.csv").cache()

label_col = "default"
feature_cols = credit.columns
feature_cols.remove(label_col)

df_train, df_test = credit.randomSplit(weights=[0.7, 0.3], seed=1)


pipeline = Pipeline()
print(pipeline.explainParams())
encoder = DFOneHotEncoder(label_column = label_col)
vectorizer = VectorAssembler(inputCols = feature_cols, outputCol="features")
forest = RandomForestClassifier(labelCol="default", featuresCol="features", seed = 123)

pipeline.setStages([encoder, vectorizer, forest])
pipelineModel = pipeline.fit(df_train)
df_test_pred = pipelineModel.transform(df_test)
evaluator = evaluation.MulticlassClassificationEvaluator(labelCol="default", 
                                        metricName="accuracy", predictionCol="prediction")

accuracy = evaluator.evaluate(df_test_pred)
print("Accuracy", accuracy)


stages: a list of pipeline stages (undefined)
Accuracy 0.7601351351351351

In [ ]: